package drds.binlog.parse;

import drds.binlog.common.AbstractLifeCycle;
import drds.binlog.common.Authors;
import drds.binlog.data_object.Entry;
import drds.binlog.data_object.EventType;
import drds.binlog.store.StoreException;
import drds.common.Author;
import lombok.Getter;
import lombok.Setter;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 缓冲event队列，提供按事务刷新数据的机制
 */
@Author(name = Authors.LI_YANG)
public class EntryListTransaction extends AbstractLifeCycle
{

    private static final long init_index = -1;
    @Setter
    @Getter
    private int bufferSize = 1024;
    @Setter
    @Getter
    private int indexMask;
    @Setter
    @Getter
    private Entry[] entries;

    private AtomicLong putIndex = new AtomicLong(init_index); // 代表当前put操作最后一次写操作发生的位置
    private AtomicLong flushIndex = new AtomicLong(init_index); // 代表满足flush条件后最后一次数据flush的时间

    private EntryListTransactionFlush entryListTransactionFlush;

    public EntryListTransaction()
    {

    }

    public EntryListTransaction(EntryListTransactionFlush entryListTransactionFlush)
    {
        this.entryListTransactionFlush = entryListTransactionFlush;
    }

    public void start() throws StoreException
    {
        super.start();
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        Assert.notNull(entryListTransactionFlush, "flush callback is null!");
        indexMask = bufferSize - 1;
        entries = new Entry[bufferSize];
    }

    public void stop() throws StoreException
    {
        putIndex.set(init_index);
        flushIndex.set(init_index);

        entries = null;
        super.stop();
    }

    public void add(List<Entry> entryList) throws InterruptedException
    {
        for (Entry entry : entryList)
        {
            add(entry);
        }
    }

    public void add(Entry entry) throws InterruptedException
    {
        switch (entry.getEntryType())
        {
            case transaction_begin:
                flush();// 刷新上一次的数据
                put(entry);
                break;
            case transaction_end:
                put(entry);
                flush();
                break;
            case rowdata:
                put(entry);
                // 针对非DML的数据，直接输出，不进行buffer控制
                EventType eventType = entry.getHeader().getEventType();
                if (eventType != null && !isDml(eventType))
                {
                    flush();
                }
                break;
            case heartbeat:
                // master过来的heartbeat，说明binlog已经读完了，是idle状态
                put(entry);
                flush();
                break;
            default:
                break;
        }
    }

    public void reset()
    {
        putIndex.set(init_index);
        flushIndex.set(init_index);
    }

    private void put(Entry entry) throws InterruptedException
    {
        // 首先检查是否有空位
        if (checkFreeSlotAt(putIndex.get() + 1))
        {
            long putIndex = this.putIndex.get();
            long putIndex$Next = putIndex + 1;

            // 先写数据，再更新对应的cursor,并发度高的情况，putSequence会被get请求可见，拿出了ringbuffer中的老的Entry值
            entries[getIndex(putIndex$Next)] = entry;
            this.putIndex.set(putIndex$Next);
        } else
        {
            flush();// buffer区满了，刷新一下
            put(entry);// 继续加一下新数据
        }
    }

    private void flush() throws InterruptedException
    {
        long flushIndex = this.flushIndex.get() + 1;
        long putIndex = this.putIndex.get();

        if (flushIndex <= putIndex)
        {
            List<Entry> entryList = new ArrayList<Entry>();
            for (long index = flushIndex; index <= putIndex; index++)
            {
                entryList.add(this.entries[getIndex(index)]);
            }

            entryListTransactionFlush.flush(entryList);
            this.flushIndex.set(putIndex);// flush成功后，更新flush位置
        }
    }

    /**
     * 查询是否有空位
     */
    private boolean checkFreeSlotAt(final long index)
    {
        final long $ = index - bufferSize;
        if ($ > flushIndex.get())
        { // 刚好追上一轮
            return false;
        } else
        {
            return true;
        }
    }

    private int getIndex(long index)
    {
        return (int) index & indexMask;
    }

    private boolean isDml(EventType eventType)
    {
        return eventType == EventType.insert || eventType == EventType.update || eventType == EventType.delete;
    }


}
